Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add awaitable_builder decorator #239

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from

Conversation

superstar54
Copy link
Member

@superstar54 superstar54 commented Aug 18, 2024

This PR adds an awatiable_builder decorator; the decorated function will submit an AiiDA Process and return the ProcessNode.

  • The task itself will finished immediately
  • The WorkGraph will wait for the return process.

Note: The user needs to use the modified submit (from aiida_workgraph.engine.utils import submit), so that the task can submit the process inside the WorkGraph without using the self.submit.

The primary purpose is to allow the WorkGraph to submit other jobs.

submission_controller using WorkGraph.

Here is an example to submit the PW calculation for a list of structures inside a AiiDA group. The maximum running process is set to 2. Similar to this example from aiida-submission-controller.

from aiida_workgraph import WorkGraph, task
from aiida import load_profile, orm
from ase.build import bulk

load_profile()

@task(outputs=[{"name": "to_submit_group"}, {"name": "submitted_group"}])
def prepare_input(cell_sizes, to_submit_group_label, submitted_group_label):
    """A function that creates a group of structures to submit.
    """
    to_submit_group, created = orm.Group.collection.get_or_create(to_submit_group_label)
    submitted_group, _ = orm.Group.collection.get_or_create(submitted_group_label)
    if created:
        for cell_size in cell_sizes:
            structure = orm.StructureData(ase=bulk("Al", a=cell_size, cubic=True))
            structure.store()
            to_submit_group.add_nodes(structure)
    return {"to_submit_group": to_submit_group, "submitted_group": submitted_group}

@task(outputs=[{"name": "should_run"}, {"name": "structure"}])
def find_next(to_submit_group, submitted_group):
    """A function that checks if there are any structures that need to be submitted.
    """
    #find the difference between the two groups
    pks = [node.pk for node in to_submit_group.nodes]
    pks2 = [node.pk for node in submitted_group.nodes]
    extras_to_run = list(set(pks).difference(pks2))

    if len(extras_to_run) == 0:
        return {"should_run": False, "structure": None}

    structure = orm.load_node(extras_to_run[0])
    
    return {"should_run": True, "structure": structure}

@task.awaitable_builder()
def submit_job(structure, code, protocol, submitted_group):
    """
    This function is responsible for submitting a task to the scheduler.
    """
    from aiida_quantumespresso.workflows.pw.base import PwBaseWorkChain
    from aiida_workgraph.engine.utils import submit

    builder = PwBaseWorkChain.get_builder_from_protocol(code,
            structure=structure,
            protocol=protocol)
    node = submit(builder)
    submitted_group.add_nodes(structure)
    return {f"structure_{structure.pk}": node}

pw_code = orm.load_code("qe-7.2-pw@localhost")

#------------------------------------------------------------------------------------------------------------
wg = WorkGraph("test_submission_controller")
prepare_input1 = wg.add_task(prepare_input, name="prepare_input",
                             to_submit_group_label="structures",
                             submitted_group_label="submitted_structures",
                             cell_sizes=(3.9, 4.0, 4.1, 4.2, 4.3, 4.4),
)
find_next1 = wg.add_task(find_next, name="find_next1", to_submit_group=prepare_input1.outputs["to_submit_group"],
                        submitted_group=prepare_input1.outputs["submitted_group"])
# Create a while zone
while1 = wg.add_task("While", name="While", conditions=find_next1.outputs["should_run"])
submit_job1 = wg.add_task(submit_job, name="submit_job", code=pw_code,
                        structure=find_next1.outputs["structure"],
                        protocol="fast",
                        submitted_group=prepare_input1.outputs["submitted_group"])
while1.children.add([submit_job1])
# Set the maximum running process is set to 2
wg.max_number_jobs = 2
#------------------------------------------------------------------------------------------------------------
wg.submit()

WorkGraph

Screenshot from 2024-08-19 14-04-19

Timeline

We can see that the maximum number of running processes is 2 for all the time.

Screenshot from 2024-08-19 14-08-39

The examples can be found here:https://github.com/superstar54/aiida-submission-controller/blob/feature/workgraph/examples/workgraph_group.py

@superstar54 superstar54 changed the title Add "awatiable_builder" decorator Add awatiable_builder decorator Aug 18, 2024
@superstar54 superstar54 marked this pull request as draft August 18, 2024 12:24
@superstar54 superstar54 force-pushed the feature/submission-controller branch from 0e00897 to 2774f56 Compare August 18, 2024 16:17
@codecov-commenter
Copy link

codecov-commenter commented Aug 18, 2024

Codecov Report

Attention: Patch coverage is 34.32836% with 44 lines in your changes missing coverage. Please review.

Project coverage is 79.08%. Comparing base (5937b88) to head (7f5acd2).
Report is 50 commits behind head on main.

Files Patch % Lines
aiida_workgraph/engine/utils.py 39.39% 20 Missing ⚠️
aiida_workgraph/engine/workgraph.py 29.16% 17 Missing ⚠️
aiida_workgraph/decorator.py 30.00% 7 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #239      +/-   ##
==========================================
+ Coverage   75.75%   79.08%   +3.33%     
==========================================
  Files          70       65       -5     
  Lines        4615     4959     +344     
==========================================
+ Hits         3496     3922     +426     
+ Misses       1119     1037      -82     
Flag Coverage Δ
python-3.11 79.00% <34.32%> (+3.34%) ⬆️
python-3.9 79.04% <34.32%> (+3.30%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@superstar54 superstar54 linked an issue Aug 18, 2024 that may be closed by this pull request
@superstar54 superstar54 force-pushed the feature/submission-controller branch from 6905686 to adffe35 Compare August 19, 2024 04:38
@superstar54
Copy link
Member Author

Hi @mbercx , I've created a submission controller using WorkGraph. Could you please review the example above and let me know if it makes sense to you? I'd appreciate your feedback.

@superstar54 superstar54 force-pushed the main branch 3 times, most recently from ece97b9 to 98bc7da Compare August 26, 2024 12:00
@agoscinski agoscinski changed the title Add awatiable_builder decorator Add awaitable_builder decorator Sep 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement submission controller using WorkGraph
2 participants